{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Spark SQL Examples\n",
    "\n",
    "Run the code cells below. This is the same code from the previous screencast."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.functions import udf\n",
    "from pyspark.sql.types import StringType\n",
    "from pyspark.sql.types import IntegerType\n",
    "from pyspark.sql.functions import desc\n",
    "from pyspark.sql.functions import asc\n",
    "from pyspark.sql.functions import sum as Fsum\n",
    "\n",
    "import datetime\n",
    "\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "%matplotlib inline\n",
    "import matplotlib.pyplot as plt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession \\\n",
    "    .builder \\\n",
    "    .appName(\"Data wrangling with Spark SQL\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "path = \"data/sparkify_log_small.json\"\n",
    "user_log = spark.read.json(path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='\"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36\"', userId='1046')]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "user_log.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- artist: string (nullable = true)\n",
      " |-- auth: string (nullable = true)\n",
      " |-- firstName: string (nullable = true)\n",
      " |-- gender: string (nullable = true)\n",
      " |-- itemInSession: long (nullable = true)\n",
      " |-- lastName: string (nullable = true)\n",
      " |-- length: double (nullable = true)\n",
      " |-- level: string (nullable = true)\n",
      " |-- location: string (nullable = true)\n",
      " |-- method: string (nullable = true)\n",
      " |-- page: string (nullable = true)\n",
      " |-- registration: long (nullable = true)\n",
      " |-- sessionId: long (nullable = true)\n",
      " |-- song: string (nullable = true)\n",
      " |-- status: long (nullable = true)\n",
      " |-- ts: long (nullable = true)\n",
      " |-- userAgent: string (nullable = true)\n",
      " |-- userId: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "user_log.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create a View And Run Queries\n",
    "\n",
    "The code below creates a temporary view against which you can run SQL queries."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "user_log.createOrReplaceTempView(\"user_log_table\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+\n",
      "|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|\n",
      "+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+\n",
      "|Showaddywaddy|Logged In|  Kenneth|     M|          112|Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|\"Mozilla/5.0 (Win...|  1046|\n",
      "|   Lily Allen|Logged In|Elizabeth|     F|            7|   Chase|195.23873| free|Shreveport-Bossie...|   PUT|NextSong|1512718541284|     5027|       Cheryl Tweedy|   200|1513720878284|\"Mozilla/5.0 (Win...|  1000|\n",
      "+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark.sql(\"SELECT * FROM user_log_table LIMIT 2\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+\n",
      "|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|\n",
      "+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+\n",
      "|Showaddywaddy|Logged In|  Kenneth|     M|          112|Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|\"Mozilla/5.0 (Win...|  1046|\n",
      "|   Lily Allen|Logged In|Elizabeth|     F|            7|   Chase|195.23873| free|Shreveport-Bossie...|   PUT|NextSong|1512718541284|     5027|       Cheryl Tweedy|   200|1513720878284|\"Mozilla/5.0 (Win...|  1000|\n",
      "+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark.sql('''\n",
    "          SELECT * \n",
    "          FROM user_log_table \n",
    "          LIMIT 2\n",
    "          '''\n",
    "          ).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|count(1)|\n",
      "+--------+\n",
      "|   10000|\n",
      "+--------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark.sql('''\n",
    "          SELECT COUNT(*) \n",
    "          FROM user_log_table \n",
    "          '''\n",
    "          ).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(userID='1046', firstname='Kenneth', page='NextSong', song='Christmas Tears Will Fall'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Be Wary Of A Woman'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Public Enemy No.1'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Reign Of The Tyrants'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Father And Son'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='No. 5'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Seventeen'),\n",
       " Row(userID='1046', firstname='Kenneth', page='Home', song=None),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='War on war'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Killermont Street'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Black & Blue'),\n",
       " Row(userID='1046', firstname='Kenneth', page='Logout', song=None),\n",
       " Row(userID='1046', firstname='Kenneth', page='Home', song=None),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Heads Will Roll'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Bleed It Out [Live At Milton Keynes]'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Clocks'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Love Rain'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song=\"Ry Ry's Song (Album Version)\"),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='The Invisible Man'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Ask The Mountains'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Given Up (Album Version)'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='El Cuatrero'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Hero/Heroine'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Spring'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Rising Moon'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Tough Little Boys'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song=\"Qu'Est-Ce Que T'Es Belle\"),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Secrets'),\n",
       " Row(userID='1046', firstname='Kenneth', page='NextSong', song='Under The Gun')]"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.sql('''\n",
    "          SELECT userID, firstname, page, song\n",
    "          FROM user_log_table \n",
    "          WHERE userID == '1046'\n",
    "          '''\n",
    "          ).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------------+\n",
      "|            page|\n",
      "+----------------+\n",
      "|           About|\n",
      "|       Downgrade|\n",
      "|           Error|\n",
      "|            Help|\n",
      "|            Home|\n",
      "|           Login|\n",
      "|          Logout|\n",
      "|        NextSong|\n",
      "|   Save Settings|\n",
      "|        Settings|\n",
      "|Submit Downgrade|\n",
      "|  Submit Upgrade|\n",
      "|         Upgrade|\n",
      "+----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark.sql('''\n",
    "          SELECT DISTINCT page\n",
    "          FROM user_log_table \n",
    "          ORDER BY page ASC\n",
    "          '''\n",
    "          ).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# User Defined Functions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<function __main__.<lambda>(x)>"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.udf.register(\"get_hour\", lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).hour))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='\"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36\"', userId='1046', hour='22')]"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.sql('''\n",
    "          SELECT *, get_hour(ts) AS hour\n",
    "          FROM user_log_table \n",
    "          LIMIT 1\n",
    "          '''\n",
    "          ).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "songs_in_hour = spark.sql('''\n",
    "          SELECT get_hour(ts) AS hour, COUNT(*) as plays_per_hour\n",
    "          FROM user_log_table\n",
    "          WHERE page = \"NextSong\"\n",
    "          GROUP BY hour\n",
    "          ORDER BY cast(hour as int) ASC\n",
    "          '''\n",
    "          )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+--------------+\n",
      "|hour|plays_per_hour|\n",
      "+----+--------------+\n",
      "|   0|           456|\n",
      "|   1|           454|\n",
      "|   2|           382|\n",
      "|   3|           302|\n",
      "|   4|           352|\n",
      "|   5|           276|\n",
      "|   6|           348|\n",
      "|   7|           358|\n",
      "|   8|           375|\n",
      "|   9|           249|\n",
      "|  10|           216|\n",
      "|  11|           228|\n",
      "|  12|           251|\n",
      "|  13|           339|\n",
      "|  14|           462|\n",
      "|  15|           479|\n",
      "|  16|           484|\n",
      "|  17|           430|\n",
      "|  18|           362|\n",
      "|  19|           295|\n",
      "+----+--------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "songs_in_hour.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Converting Results to Pandas"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "songs_in_hour_pd = songs_in_hour.toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>hour</th>\n",
       "      <th>plays_per_hour</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>0</td>\n",
       "      <td>456</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>1</td>\n",
       "      <td>454</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>2</td>\n",
       "      <td>382</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>3</td>\n",
       "      <td>302</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>4</td>\n",
       "      <td>352</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>5</th>\n",
       "      <td>5</td>\n",
       "      <td>276</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>6</th>\n",
       "      <td>6</td>\n",
       "      <td>348</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>7</th>\n",
       "      <td>7</td>\n",
       "      <td>358</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>8</th>\n",
       "      <td>8</td>\n",
       "      <td>375</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>9</th>\n",
       "      <td>9</td>\n",
       "      <td>249</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>10</th>\n",
       "      <td>10</td>\n",
       "      <td>216</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>11</th>\n",
       "      <td>11</td>\n",
       "      <td>228</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>12</th>\n",
       "      <td>12</td>\n",
       "      <td>251</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>13</th>\n",
       "      <td>13</td>\n",
       "      <td>339</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>14</th>\n",
       "      <td>14</td>\n",
       "      <td>462</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>15</th>\n",
       "      <td>15</td>\n",
       "      <td>479</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>16</th>\n",
       "      <td>16</td>\n",
       "      <td>484</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>17</th>\n",
       "      <td>17</td>\n",
       "      <td>430</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>18</th>\n",
       "      <td>18</td>\n",
       "      <td>362</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>19</th>\n",
       "      <td>19</td>\n",
       "      <td>295</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>20</th>\n",
       "      <td>20</td>\n",
       "      <td>257</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>21</th>\n",
       "      <td>21</td>\n",
       "      <td>248</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>22</th>\n",
       "      <td>22</td>\n",
       "      <td>369</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>23</th>\n",
       "      <td>23</td>\n",
       "      <td>375</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   hour  plays_per_hour\n",
       "0     0             456\n",
       "1     1             454\n",
       "2     2             382\n",
       "3     3             302\n",
       "4     4             352\n",
       "5     5             276\n",
       "6     6             348\n",
       "7     7             358\n",
       "8     8             375\n",
       "9     9             249\n",
       "10   10             216\n",
       "11   11             228\n",
       "12   12             251\n",
       "13   13             339\n",
       "14   14             462\n",
       "15   15             479\n",
       "16   16             484\n",
       "17   17             430\n",
       "18   18             362\n",
       "19   19             295\n",
       "20   20             257\n",
       "21   21             248\n",
       "22   22             369\n",
       "23   23             375"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "songs_in_hour_pd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
